/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.atlas.examples;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.sun.jersey.core.util.MultivaluedMapImpl;
import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasClient;
import org.apache.atlas.AtlasClientV2;
import org.apache.atlas.AtlasException;
import org.apache.atlas.AtlasServiceException;
import org.apache.atlas.model.SearchFilter;
import org.apache.atlas.model.discovery.AtlasSearchResult;
import org.apache.atlas.model.discovery.AtlasSearchResult.AtlasFullTextResult;
import org.apache.atlas.model.discovery.AtlasSearchResult.AttributeSearchResult;
import org.apache.atlas.model.instance.AtlasClassification;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
import org.apache.atlas.model.instance.AtlasEntityHeader;
import org.apache.atlas.model.instance.EntityMutationResponse;
import org.apache.atlas.model.instance.EntityMutations.EntityOperation;
import org.apache.atlas.model.lineage.AtlasLineageInfo;
import org.apache.atlas.model.lineage.AtlasLineageInfo.LineageDirection;
import org.apache.atlas.model.lineage.AtlasLineageInfo.LineageRelation;
import org.apache.atlas.model.typedef.AtlasBaseTypeDef;
import org.apache.atlas.model.typedef.AtlasClassificationDef;
import org.apache.atlas.model.typedef.AtlasEntityDef;
import org.apache.atlas.model.typedef.AtlasEnumDef;
import org.apache.atlas.model.typedef.AtlasStructDef;
import org.apache.atlas.model.typedef.AtlasTypesDef;
import org.apache.atlas.type.AtlasTypeUtil;
import org.apache.atlas.utils.AuthenticationUtil;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.lang.ArrayUtils;

import javax.ws.rs.core.MultivaluedMap;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;

import static org.apache.atlas.model.typedef.AtlasStructDef.AtlasConstraintDef.CONSTRAINT_PARAM_ATTRIBUTE;
import static org.apache.atlas.model.typedef.AtlasStructDef.AtlasConstraintDef.CONSTRAINT_TYPE_INVERSE_REF;
import static org.apache.atlas.model.typedef.AtlasStructDef.AtlasConstraintDef.CONSTRAINT_TYPE_OWNED_REF;

/**
 * A driver that sets up sample types and entities using v2 types and entity model for testing purposes.
 */
public class QuickStartV2 {
    public static final String ATLAS_REST_ADDRESS          = "atlas.rest.address";

    public static final String SALES_DB                    = "Sales";
    public static final String REPORTING_DB                = "Reporting";
    public static final String LOGGING_DB                  = "Logging";

    public static final String SALES_FACT_TABLE            = "sales_fact";
    public static final String PRODUCT_DIM_TABLE           = "product_dim";
    public static final String CUSTOMER_DIM_TABLE          = "customer_dim";
    public static final String TIME_DIM_TABLE              = "time_dim";
    public static final String SALES_FACT_DAILY_MV_TABLE   = "sales_fact_daily_mv";
    public static final String SALES_FACT_MONTHLY_MV_TABLE = "sales_fact_monthly_mv";
    public static final String LOG_FACT_DAILY_MV_TABLE     = "log_fact_daily_mv";
    public static final String LOG_FACT_MONTHLY_MV_TABLE   = "logging_fact_monthly_mv";

    public static final String TIME_ID_COLUMN              = "time_id";
    public static final String PRODUCT_ID_COLUMN           = "product_id";
    public static final String CUSTOMER_ID_COLUMN          = "customer_id";
    public static final String APP_ID_COLUMN               = "app_id";
    public static final String MACHINE_ID_COLUMN           = "machine_id";
    public static final String PRODUCT_NAME_COLUMN         = "product_name";
    public static final String BRAND_NAME_COLUMN           = "brand_name";
    public static final String NAME_COLUMN                 = "name";
    public static final String SALES_COLUMN                = "sales";
    public static final String LOG_COLUMN                  = "log";
    public static final String ADDRESS_COLUMN              = "address";
    public static final String DAY_OF_YEAR_COLUMN          = "dayOfYear";
    public static final String WEEKDAY_COLUMN              = "weekDay";

    public static final String DIMENSION_CLASSIFICATION    = "Dimension";
    public static final String FACT_CLASSIFICATION         = "Fact";
    public static final String PII_CLASSIFICATION          = "PII";
    public static final String METRIC_CLASSIFICATION       = "Metric";
    public static final String ETL_CLASSIFICATION          = "ETL";
    public static final String JDBC_CLASSIFICATION         = "JdbcAccess";
    public static final String LOGDATA_CLASSIFICATION      = "Log Data";

    public static final String LOAD_SALES_DAILY_PROCESS    = "loadSalesDaily";
    public static final String LOAD_SALES_MONTHLY_PROCESS  = "loadSalesMonthly";
    public static final String LOAD_LOGS_MONTHLY_PROCESS   = "loadLogsMonthly";

    public static final String PRODUCT_DIM_VIEW            = "product_dim_view";
    public static final String CUSTOMER_DIM_VIEW           = "customer_dim_view";

    public static final String DATABASE_TYPE               = "DB";
    public static final String COLUMN_TYPE                 = "Column";
    public static final String TABLE_TYPE                  = "Table";
    public static final String VIEW_TYPE                   = "View";
    public static final String LOAD_PROCESS_TYPE           = "LoadProcess";
    public static final String STORAGE_DESC_TYPE           = "StorageDesc";

    public static final String[] TYPES = { DATABASE_TYPE, TABLE_TYPE, STORAGE_DESC_TYPE, COLUMN_TYPE, LOAD_PROCESS_TYPE,
                                           VIEW_TYPE, JDBC_CLASSIFICATION, ETL_CLASSIFICATION, METRIC_CLASSIFICATION,
                                           PII_CLASSIFICATION, FACT_CLASSIFICATION, DIMENSION_CLASSIFICATION, LOGDATA_CLASSIFICATION };

    public static void main(String[] args) throws Exception {
        String[] basicAuthUsernamePassword = null;

        if (!AuthenticationUtil.isKerberosAuthenticationEnabled()) {
            basicAuthUsernamePassword = AuthenticationUtil.getBasicAuthenticationInput();
        }

        runQuickstart(args, basicAuthUsernamePassword);
    }

    @VisibleForTesting
    static void runQuickstart(String[] args, String[] basicAuthUsernamePassword) throws Exception {
        String[] urls = getServerUrl(args);
        QuickStartV2 quickStartV2;

        if (!AuthenticationUtil.isKerberosAuthenticationEnabled()) {
            quickStartV2 = new QuickStartV2(urls, basicAuthUsernamePassword);
        } else {
            quickStartV2 = new QuickStartV2(urls);
        }

        // Shows how to create v2 types in Atlas for your meta model
        quickStartV2.createTypes();

        // Shows how to create v2 entities (instances) for the added types in Atlas
        quickStartV2.createEntities();

        // Shows some search queries using DSL based on types
        quickStartV2.search();

        // Shows some lineage information on entity
        quickStartV2.lineage();
        
    }

    static String[] getServerUrl(String[] args) throws AtlasException {
        if (args.length > 0) {
            return args[0].split(",");
        }

        Configuration configuration = ApplicationProperties.get();
        String[] urls = configuration.getStringArray(ATLAS_REST_ADDRESS);

        if (ArrayUtils.isEmpty(urls)) {
            System.out.println("org.apache.atlas.examples.QuickStartV2 <Atlas REST address <http/https>://<atlas-fqdn>:<atlas-port> like http://localhost:21000>");
            System.exit(-1);
        }

        return urls;
    }

    private final AtlasClientV2 atlasClientV2;

    QuickStartV2(String[] urls, String[] basicAuthUsernamePassword) {
        atlasClientV2 = new AtlasClientV2(urls,basicAuthUsernamePassword);
    }

    QuickStartV2(String[] urls) throws AtlasException {
        atlasClientV2 = new AtlasClientV2(urls);
    }


    void createTypes() throws Exception {
        AtlasTypesDef atlasTypesDef = createTypeDefinitions();

        System.out.println("\nCreating sample types: ");
        atlasClientV2.createAtlasTypeDefs(atlasTypesDef);

        verifyTypesCreated();
    }

    AtlasTypesDef createTypeDefinitions() throws Exception {
        AtlasEntityDef dbType   = AtlasTypeUtil.createClassTypeDef(DATABASE_TYPE, DATABASE_TYPE, "1.0", null,
                                  AtlasTypeUtil.createUniqueRequiredAttrDef("name", "string"),
                                  AtlasTypeUtil.createOptionalAttrDef("description", "string"),
                                  AtlasTypeUtil.createOptionalAttrDef("locationUri", "string"),
                                  AtlasTypeUtil.createOptionalAttrDef("owner", "string"),
                                  AtlasTypeUtil.createOptionalAttrDef("createTime", "long"));

        AtlasEntityDef sdType   = AtlasTypeUtil.createClassTypeDef(STORAGE_DESC_TYPE, STORAGE_DESC_TYPE, "1.0", null,
                                  AtlasTypeUtil.createOptionalAttrDefWithConstraint("table", TABLE_TYPE, CONSTRAINT_TYPE_INVERSE_REF,
                                          new HashMap<String, Object>() {{ put(CONSTRAINT_PARAM_ATTRIBUTE, "sd"); }}),
                                  AtlasTypeUtil.createOptionalAttrDef("location", "string"),
                                  AtlasTypeUtil.createOptionalAttrDef("inputFormat", "string"),
                                  AtlasTypeUtil.createOptionalAttrDef("outputFormat", "string"),
                                  AtlasTypeUtil.createRequiredAttrDef("compressed", "boolean"));

        AtlasEntityDef colType  = AtlasTypeUtil.createClassTypeDef(COLUMN_TYPE, COLUMN_TYPE, "1.0", null,
                                  AtlasTypeUtil.createOptionalAttrDef("name", "string"),
                                  AtlasTypeUtil.createOptionalAttrDef("dataType", "string"),
                                  AtlasTypeUtil.createOptionalAttrDef("comment", "string"),
                                  AtlasTypeUtil.createOptionalAttrDefWithConstraint("table", TABLE_TYPE, CONSTRAINT_TYPE_INVERSE_REF,
                                          new HashMap<String, Object>() {{ put(CONSTRAINT_PARAM_ATTRIBUTE, "columns"); }}));

        colType.setOptions(new HashMap<String, String>() {{ put("schemaAttributes", "[\"name\", \"description\", \"owner\", \"type\", \"comment\", \"position\"]"); }});

        AtlasEntityDef tblType  = AtlasTypeUtil.createClassTypeDef(TABLE_TYPE, TABLE_TYPE, "1.0", ImmutableSet.of("DataSet"),
                                  AtlasTypeUtil.createRequiredAttrDef("db", DATABASE_TYPE),
                                  AtlasTypeUtil.createRequiredAttrDefWithConstraint("sd", STORAGE_DESC_TYPE, CONSTRAINT_TYPE_OWNED_REF, null),
                                  AtlasTypeUtil.createOptionalAttrDef("owner", "string"),
                                  AtlasTypeUtil.createOptionalAttrDef("createTime", "long"),
                                  AtlasTypeUtil.createOptionalAttrDef("lastAccessTime", "long"),
                                  AtlasTypeUtil.createOptionalAttrDef("retention", "long"),
                                  AtlasTypeUtil.createOptionalAttrDef("viewOriginalText", "string"),
                                  AtlasTypeUtil.createOptionalAttrDef("viewExpandedText", "string"),
                                  AtlasTypeUtil.createOptionalAttrDef("tableType", "string"),
                                  AtlasTypeUtil.createOptionalAttrDef("temporary", "boolean"),
                                  AtlasTypeUtil.createRequiredListAttrDefWithConstraint("columns", AtlasBaseTypeDef.getArrayTypeName(COLUMN_TYPE),
                                          CONSTRAINT_TYPE_OWNED_REF, null));

        tblType.setOptions(new HashMap<String, String>() {{ put("schemaElementsAttribute", "columns"); }});

        AtlasEntityDef procType = AtlasTypeUtil.createClassTypeDef(LOAD_PROCESS_TYPE, LOAD_PROCESS_TYPE, "1.0", ImmutableSet.of("Process"),
                                  AtlasTypeUtil.createOptionalAttrDef("userName", "string"),
                                  AtlasTypeUtil.createOptionalAttrDef("startTime", "long"),
                                  AtlasTypeUtil.createOptionalAttrDef("endTime", "long"),
                                  AtlasTypeUtil.createRequiredAttrDef("queryText", "string"),
                                  AtlasTypeUtil.createRequiredAttrDef("queryPlan", "string"),
                                  AtlasTypeUtil.createRequiredAttrDef("queryId", "string"),
                                  AtlasTypeUtil.createRequiredAttrDef("queryGraph", "string"));

        AtlasEntityDef viewType = AtlasTypeUtil.createClassTypeDef(VIEW_TYPE, VIEW_TYPE, "1.0", ImmutableSet.of("DataSet"),
                                  AtlasTypeUtil.createRequiredAttrDef("db", DATABASE_TYPE),
                                  AtlasTypeUtil.createOptionalListAttrDef("inputTables", AtlasBaseTypeDef.getArrayTypeName(TABLE_TYPE)));

        AtlasClassificationDef dimClassifDef    = AtlasTypeUtil.createTraitTypeDef(DIMENSION_CLASSIFICATION,  "Dimension Classification", "1.0", ImmutableSet.<String>of());
        AtlasClassificationDef factClassifDef   = AtlasTypeUtil.createTraitTypeDef(FACT_CLASSIFICATION, "Fact Classification", "1.0", ImmutableSet.<String>of());
        AtlasClassificationDef piiClassifDef    = AtlasTypeUtil.createTraitTypeDef(PII_CLASSIFICATION, "PII Classification", "1.0", ImmutableSet.<String>of());
        AtlasClassificationDef metricClassifDef = AtlasTypeUtil.createTraitTypeDef(METRIC_CLASSIFICATION, "Metric Classification", "1.0", ImmutableSet.<String>of());
        AtlasClassificationDef etlClassifDef    = AtlasTypeUtil.createTraitTypeDef(ETL_CLASSIFICATION, "ETL Classification", "1.0", ImmutableSet.<String>of());
        AtlasClassificationDef jdbcClassifDef   = AtlasTypeUtil.createTraitTypeDef(JDBC_CLASSIFICATION, "JdbcAccess Classification", "1.0", ImmutableSet.<String>of());
        AtlasClassificationDef logClassifDef    = AtlasTypeUtil.createTraitTypeDef(LOGDATA_CLASSIFICATION, "LogData Classification", "1.0", ImmutableSet.<String>of());

        return AtlasTypeUtil.getTypesDef(ImmutableList.<AtlasEnumDef>of(),
                                         ImmutableList.<AtlasStructDef>of(),
                                         ImmutableList.of(dimClassifDef, factClassifDef, piiClassifDef, metricClassifDef, etlClassifDef, jdbcClassifDef, logClassifDef),
                                         ImmutableList.of(dbType, sdType, colType, tblType, procType, viewType));
    }

    void createEntities() throws Exception {
        System.out.println("\nCreating sample entities: ");

        // Database entities
        AtlasEntity salesDB     = createDatabase(SALES_DB, "sales database", "John ETL", "hdfs://host:8000/apps/warehouse/sales");
        AtlasEntity reportingDB = createDatabase(REPORTING_DB, "reporting database", "Jane BI", "hdfs://host:8000/apps/warehouse/reporting");
        AtlasEntity logDB       = createDatabase(LOGGING_DB, "logging database", "Tim ETL", "hdfs://host:8000/apps/warehouse/logging");

        // Storage Descriptor entities
        AtlasEntity storageDesc = createStorageDescriptor("hdfs://host:8000/apps/warehouse/sales", "TextInputFormat", "TextOutputFormat", true);

        // Column entities
        List<AtlasEntity> salesFactColumns   = ImmutableList.of(createColumn(TIME_ID_COLUMN, "int", "time id"),
                                                                createColumn(PRODUCT_ID_COLUMN, "int", "product id"),
                                                                createColumn(CUSTOMER_ID_COLUMN, "int", "customer id", PII_CLASSIFICATION),
                                                                createColumn(SALES_COLUMN, "double", "product id", METRIC_CLASSIFICATION));

        List<AtlasEntity> logFactColumns     = ImmutableList.of(createColumn(TIME_ID_COLUMN, "int", "time id"),
                                                                createColumn(APP_ID_COLUMN, "int", "app id"),
                                                                createColumn(MACHINE_ID_COLUMN, "int", "machine id"),
                                                                createColumn(LOG_COLUMN, "string", "log data", LOGDATA_CLASSIFICATION));

        List<AtlasEntity> productDimColumns  = ImmutableList.of(createColumn(PRODUCT_ID_COLUMN, "int", "product id"),
                                                                createColumn(PRODUCT_NAME_COLUMN, "string", "product name"),
                                                                createColumn(BRAND_NAME_COLUMN, "int", "brand name"));

        List<AtlasEntity> timeDimColumns     = ImmutableList.of(createColumn(TIME_ID_COLUMN, "int", "time id"),
                                                                createColumn(DAY_OF_YEAR_COLUMN, "int", "day Of Year"),
                                                                createColumn(WEEKDAY_COLUMN, "int", "week Day"));

        List<AtlasEntity> customerDimColumns = ImmutableList.of(createColumn(CUSTOMER_ID_COLUMN, "int", "customer id", PII_CLASSIFICATION),
                                                                createColumn(NAME_COLUMN, "string", "customer name", PII_CLASSIFICATION),
                                                                createColumn(ADDRESS_COLUMN, "string", "customer address", PII_CLASSIFICATION));

        // Table entities
        AtlasEntity salesFact          = createTable(SALES_FACT_TABLE, "sales fact table", salesDB, storageDesc,
                                                     "Joe", "Managed", salesFactColumns, FACT_CLASSIFICATION);
        AtlasEntity productDim         = createTable(PRODUCT_DIM_TABLE, "product dimension table", salesDB, storageDesc,
                                                     "John Doe", "Managed", productDimColumns, DIMENSION_CLASSIFICATION);
        AtlasEntity customerDim        = createTable(CUSTOMER_DIM_TABLE, "customer dimension table", salesDB, storageDesc,
                                                     "fetl", "External", customerDimColumns, DIMENSION_CLASSIFICATION);
        AtlasEntity timeDim            = createTable(TIME_DIM_TABLE, "time dimension table", salesDB, storageDesc,
                                                     "John Doe", "External", timeDimColumns, DIMENSION_CLASSIFICATION);
        AtlasEntity loggingFactDaily   = createTable(LOG_FACT_DAILY_MV_TABLE, "log fact daily materialized view", logDB,
                                                     storageDesc, "Tim ETL", "Managed", logFactColumns, LOGDATA_CLASSIFICATION);
        AtlasEntity loggingFactMonthly = createTable(LOG_FACT_MONTHLY_MV_TABLE, "logging fact monthly materialized view", logDB,
                                                     storageDesc, "Tim ETL", "Managed", logFactColumns, LOGDATA_CLASSIFICATION);
        AtlasEntity salesFactDaily     = createTable(SALES_FACT_DAILY_MV_TABLE, "sales fact daily materialized view", reportingDB,
                                                     storageDesc, "Joe BI", "Managed", salesFactColumns, METRIC_CLASSIFICATION);
        AtlasEntity salesFactMonthly   = createTable(SALES_FACT_MONTHLY_MV_TABLE, "sales fact monthly materialized view", reportingDB,
                                                     storageDesc, "Jane BI", "Managed", salesFactColumns, METRIC_CLASSIFICATION);

        // View entities
        createView(PRODUCT_DIM_VIEW, reportingDB, ImmutableList.of(productDim), DIMENSION_CLASSIFICATION, JDBC_CLASSIFICATION);
        createView(CUSTOMER_DIM_VIEW, reportingDB, ImmutableList.of(customerDim), DIMENSION_CLASSIFICATION, JDBC_CLASSIFICATION);

        // Process entities
        createProcess(LOAD_SALES_DAILY_PROCESS, "hive query for daily summary", "John ETL",
                      ImmutableList.of(salesFact, timeDim),
                      ImmutableList.of(salesFactDaily),
                      "create table as select ", "plan", "id", "graph", ETL_CLASSIFICATION);

        createProcess(LOAD_SALES_MONTHLY_PROCESS, "hive query for monthly summary", "John ETL",
                      ImmutableList.of(salesFactDaily),
                      ImmutableList.of(salesFactMonthly),
                      "create table as select ", "plan", "id", "graph", ETL_CLASSIFICATION);

        createProcess(LOAD_LOGS_MONTHLY_PROCESS, "hive query for monthly summary", "Tim ETL",
                      ImmutableList.of(loggingFactDaily),
                      ImmutableList.of(loggingFactMonthly),
                      "create table as select ", "plan", "id", "graph", ETL_CLASSIFICATION);
    }

    private AtlasEntity createInstance(AtlasEntity entity, String[] traitNames) throws Exception {
        AtlasEntity ret = null;
        EntityMutationResponse  response = atlasClientV2.createEntity(new AtlasEntityWithExtInfo(entity));
        List<AtlasEntityHeader> entities = response.getEntitiesByOperation(EntityOperation.CREATE);

        if (CollectionUtils.isNotEmpty(entities)) {
            AtlasEntityWithExtInfo getByGuidResponse = atlasClientV2.getEntityByGuid(entities.get(0).getGuid());
            ret = getByGuidResponse.getEntity();
            System.out.println("Created entity of type [" + ret.getTypeName() + "], guid: " + ret.getGuid());
        }

        return ret;
    }

    AtlasEntity createDatabase(String name, String description, String owner, String locationUri, String... traitNames)
            throws Exception {
        AtlasEntity entity = new AtlasEntity(DATABASE_TYPE);

        entity.setClassifications(toAtlasClassifications(traitNames));
        entity.setAttribute("name", name);
        entity.setAttribute("description", description);
        entity.setAttribute("owner", owner);
        entity.setAttribute("locationuri", locationUri);
        entity.setAttribute("createTime", System.currentTimeMillis());

        return createInstance(entity, traitNames);
    }

    private List<AtlasClassification> toAtlasClassifications(String[] traitNames) {
        List<AtlasClassification> ret    = new ArrayList<>();
        ImmutableList<String>     traits = ImmutableList.copyOf(traitNames);

        if (CollectionUtils.isNotEmpty(traits)) {
            for (String trait : traits) {
                ret.add(new AtlasClassification(trait));
            }
        }

        return ret;
    }

    AtlasEntity createStorageDescriptor(String location, String inputFormat, String outputFormat, boolean compressed)
            throws Exception {
        AtlasEntity entity = new AtlasEntity(STORAGE_DESC_TYPE);

        entity.setAttribute("location", location);
        entity.setAttribute("inputFormat", inputFormat);
        entity.setAttribute("outputFormat", outputFormat);
        entity.setAttribute("compressed", compressed);

        return createInstance(entity, null);
    }

    AtlasEntity createColumn(String name, String dataType, String comment, String... traitNames) throws Exception {

        AtlasEntity entity = new AtlasEntity(COLUMN_TYPE);
        entity.setClassifications(toAtlasClassifications(traitNames));
        entity.setAttribute("name", name);
        entity.setAttribute("dataType", dataType);
        entity.setAttribute("comment", comment);

        return createInstance(entity, traitNames);
    }

    AtlasEntity createTable(String name, String description, AtlasEntity db, AtlasEntity sd, String owner, String tableType,
                            List<AtlasEntity> columns, String... traitNames) throws Exception {
        AtlasEntity entity = new AtlasEntity(TABLE_TYPE);

        entity.setClassifications(toAtlasClassifications(traitNames));
        entity.setAttribute("name", name);
        entity.setAttribute(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, name);
        entity.setAttribute("description", description);
        entity.setAttribute("owner", owner);
        entity.setAttribute("tableType", tableType);
        entity.setAttribute("createTime", System.currentTimeMillis());
        entity.setAttribute("lastAccessTime", System.currentTimeMillis());
        entity.setAttribute("retention", System.currentTimeMillis());
        entity.setAttribute("db", AtlasTypeUtil.getAtlasObjectId(db));
        entity.setAttribute("sd", AtlasTypeUtil.getAtlasObjectId(sd));
        entity.setAttribute("columns", AtlasTypeUtil.toObjectIds(columns));

        return createInstance(entity, traitNames);
    }

    AtlasEntity createProcess(String name, String description, String user, List<AtlasEntity> inputs, List<AtlasEntity> outputs,
            String queryText, String queryPlan, String queryId, String queryGraph, String... traitNames) throws Exception {
        AtlasEntity entity = new AtlasEntity(LOAD_PROCESS_TYPE);

        entity.setClassifications(toAtlasClassifications(traitNames));
        entity.setAttribute(AtlasClient.NAME, name);
        entity.setAttribute(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, name);
        entity.setAttribute("description", description);
        entity.setAttribute("inputs", inputs);
        entity.setAttribute("outputs", outputs);
        entity.setAttribute("user", user);
        entity.setAttribute("startTime", System.currentTimeMillis());
        entity.setAttribute("endTime", System.currentTimeMillis() + 10000);
        entity.setAttribute("queryText", queryText);
        entity.setAttribute("queryPlan", queryPlan);
        entity.setAttribute("queryId", queryId);
        entity.setAttribute("queryGraph", queryGraph);

        return createInstance(entity, traitNames);
    }

    AtlasEntity createView(String name, AtlasEntity db, List<AtlasEntity> inputTables, String... traitNames) throws Exception {
        AtlasEntity entity = new AtlasEntity(VIEW_TYPE);

        entity.setClassifications(toAtlasClassifications(traitNames));
        entity.setAttribute("name", name);
        entity.setAttribute(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, name);
        entity.setAttribute("db", db);
        entity.setAttribute("inputTables", inputTables);

        return createInstance(entity, traitNames);
    }

    private void verifyTypesCreated() throws Exception {
        MultivaluedMap<String, String> searchParams = new MultivaluedMapImpl();

        for (String typeName : TYPES) {
            searchParams.clear();
            searchParams.add(SearchFilter.PARAM_NAME, typeName);
            SearchFilter searchFilter = new SearchFilter(searchParams);
            AtlasTypesDef searchDefs = atlasClientV2.getAllTypeDefs(searchFilter);

            assert (!searchDefs.isEmpty());
            System.out.println("Created type [" + typeName + "]");
        }
    }

    private String[] getDSLQueries() {
        return new String[]{
                "from DB",
                "DB",
                "DB where name=\"Reporting\"",
                "DB where DB.name=\"Reporting\"",
                "DB name = \"Reporting\"",
                "DB DB.name = \"Reporting\"",
                "DB where name=\"Reporting\" select name, owner",
                "DB where DB.name=\"Reporting\" select name, owner",
                "DB has name",
                "DB where DB has name",
                "DB, Table",
                "DB is JdbcAccess",
                "from Table",
                "Table",
                "Table is Dimension",
                "Column where Column isa PII",
                "View is Dimension",
                "Column select Column.name",
                "Column select name",
                "Column where Column.name=\"customer_id\"",
                "from Table select Table.name",
                "DB where (name = \"Reporting\")",
                "DB where (name = \"Reporting\") select name as _col_0, owner as _col_1",
                "DB where DB is JdbcAccess",
                "DB where DB has name",
                "DB Table",
                "DB as db1 Table where (db1.name = \"Reporting\")",
                "DB where (name = \"Reporting\") select name as _col_0, (createTime + 1) as _col_1 ",
                DIMENSION_CLASSIFICATION,
                JDBC_CLASSIFICATION,
                ETL_CLASSIFICATION,
                METRIC_CLASSIFICATION,
                PII_CLASSIFICATION,
                "`Log Data`",
                "Table where name=\"sales_fact\", columns",
                "Table where name=\"sales_fact\", columns as column select column.name, column.dataType, column.comment",
                "from DataSet",
                "from Process" };
    }

    private void search() throws Exception {
        System.out.println("\nSample DSL Queries: ");

        for (String dslQuery : getDSLQueries()) {
            AtlasSearchResult results = atlasClientV2.dslSearchWithParams(dslQuery, 10, 0);

            if (results != null) {
                List<AtlasEntityHeader>   entitiesResult  = results.getEntities();
                List<AtlasFullTextResult> fullTextResults = results.getFullTextResult();
                AttributeSearchResult     attribResult    = results.getAttributes();

                if (CollectionUtils.isNotEmpty(entitiesResult)) {
                    System.out.println("query [" + dslQuery + "] returned [" + entitiesResult.size() + "] rows.");
                } else if (CollectionUtils.isNotEmpty(fullTextResults)) {
                    System.out.println("query [" + dslQuery + "] returned [" + fullTextResults.size() + "] rows.");
                } else if (attribResult != null) {
                    System.out.println("query [" + dslQuery + "] returned [" + attribResult.getValues().size() + "] rows.");
                }
            } else {
                System.out.println("query [" + dslQuery + "] failed, results:" + results);
            }
        }
    }

    private void lineage() throws AtlasServiceException {
        System.out.println("\nSample Lineage Info: ");

        AtlasLineageInfo lineageInfo = atlasClientV2.getLineageInfo(getTableId(SALES_FACT_DAILY_MV_TABLE), LineageDirection.BOTH, 0);
        Set<LineageRelation> relations = lineageInfo.getRelations();
        Map<String, AtlasEntityHeader> guidEntityMap = lineageInfo.getGuidEntityMap();

        for (LineageRelation relation : relations) {
            AtlasEntityHeader fromEntity = guidEntityMap.get(relation.getFromEntityId());
            AtlasEntityHeader toEntity   = guidEntityMap.get(relation.getToEntityId());

            System.out.println(fromEntity.getDisplayText() + "(" + fromEntity.getTypeName() + ") -> " +
                               toEntity.getDisplayText()   + "(" + toEntity.getTypeName() + ")");
        }
    }

    private String getTableId(String tableName) throws AtlasServiceException {
        Map<String, String> attributes = new HashMap<>();
        attributes.put(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, tableName);

        AtlasEntity tableEntity = atlasClientV2.getEntityByAttribute(TABLE_TYPE, attributes).getEntity();
        return tableEntity.getGuid();
    }
}
